package com.huan.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 构建配置对象
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new TopNDriver(), args);
        // 退出程序
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 构建Job对象实例 参数（配置对象，Job对象名称）
        Job job = Job.getInstance(getConf(), "topN");
        // 设置mr程序运行的主类
        job.setJarByClass(TopNDriver.class);
        // 设置mr程序运行的 mapper类型和reduce类型
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        // 指定mapper阶段输出的kv数据类型
        job.setMapOutputKeyClass(OrderVo.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce阶段输出的kv数据类型，业务mr程序输出的最终类型
        job.setOutputKeyClass(OrderVo.class);
        job.setOutputValueClass(NullWritable.class);
        // 配置本例子中的输入数据路径和输出数据路径，默认输入输出组件为： TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 先删除输出目录（方便本地测试）
        FileSystem.get(this.getConf()).delete(new Path(args[1]), true);

        // 设置分组
        job.setGroupingComparatorClass(TopNGroupingComparator.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}